package kfk;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;

/**
 * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 */
public class EventSubscriber {
    private static Properties props4Sub;
    private static Properties props4Pub;
    private static ExecutorService singleThreadExecutor = Executors.newCachedThreadPool();
    private static volatile boolean isStopAll = false;

    static {
        props4Sub = new Properties();
        props4Sub.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaClientConfig.ServerUrl);
        props4Sub.put(ConsumerConfig.GROUP_ID_CONFIG, "zookeeper");
        props4Sub.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props4Sub.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props4Sub.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props4Sub.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props4Pub = new Properties();
        props4Pub.put("bootstrap.servers", KafkaClientConfig.ServerUrl);
        props4Pub.put("acks", "all");
        props4Pub.put("retries", 0);
        props4Pub.put("batch.size", 16384);
        props4Pub.put("linger.ms", 1);
        props4Pub.put("buffer.memory", 33554432);
        props4Pub.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props4Pub.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    public static void main(String[] args) {

        EventSubscriber.registerAsyncSubscriberCallBack(new IEventSubscriber() {
            @Override
            public void onMessage(String channel, String key) {

            }
        }, KafkaClientConfig.TestChannel);
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            EventSubscriber.pub(KafkaClientConfig.TestChannel, "hi :" + i);
        }
    }

    /**
     * 在一个新线程监听消息队列
     *
     * @param eventSubscriber
     * @param channel
     */
    public static void registerAsyncSubscriberCallBack(final IEventSubscriber eventSubscriber, final String... channel) {
        StackTraceElement[] stackTrace = new Exception().getStackTrace();
        for (int i = 0; i < stackTrace.length; i++) {
            System.out.println(stackTrace[i].toString());
        }

        singleThreadExecutor.execute(new Runnable() {
            @Override
            public void run() {
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(EventSubscriber.props4Sub);
                consumer.subscribe(Arrays.asList(channel));
                while (!EventSubscriber.isStopAll) {
                    ConsumerRecords<String, String> records = consumer.poll(KafkaClientConfig.TIMEOUT_POLL);
//            System.out.println("records count:" + records.count());
                    for (ConsumerRecord<String, String> record : records) {
                        eventSubscriber.onMessage(record.topic(), record.value());
                        System.out.printf("[MQ->RECV] channel = %s ,offset = %d, K = %s, V = %s at %d %n", record.topic(), record.offset(), record.key(), record.value(), System.currentTimeMillis());
                    }
                }
            }
        });

    }

    volatile static Producer<String, String> producer;

    //    static ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque();
    public static void pub(String channel, String value) {
        if (producer == null) {
            synchronized (EventSubscriber.class) {
                producer = new KafkaProducer<String, String>(EventSubscriber.props4Pub);
            }
        }
        Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(channel, "umsp", value), new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                    System.out.println("[MQ-SEND] fail, channel:"+channel+",value:"+value);
//                    queue.push();
                } else {
                    System.out.println("[MQ-SEND] success, we just sent is: " + metadata.offset());
                }
            }
        });
        producer.flush();
//            future.get(KafkaClientConfig.TIMEOUT_POLL, TimeUnit.MILLISECONDS);
//            System.out.println("[MQ->SEND] is "+future.isDone());
//            producer.close();

    }
}